package com.fanjj.spark.util

import com.alibaba.fastjson.JSONObject
import com.fanjj.spark.conf.ConfigurationManager
import com.fanjj.spark.constant.Constants
import com.fanjj.spark.entity.TaskEntity
import com.fanjj.spark.test.MockData
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.hive.HiveContext

object SparkUtil {
  /**
    * 根据当前是否本地测试的配置
    * 决定，如何设置SparkConf的master
    */
  def setMaster(conf: SparkConf): Unit = {
    val local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)
    if (local) conf.setMaster("local")
  }

  /**
    * 获取SQLContext
    * 如果spark.local设置为true，那么就创建SQLContext；否则，创建HiveContext
    *
    * @param sc
    * @return
    */
  def getSQLContext(sc: SparkContext): SQLContext = {
    val local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)
    if (local) new SQLContext(sc)
    else new HiveContext(sc)
  }

  /**
    * 生成模拟数据
    * 如果spark.local配置设置为true，则生成模拟数据；否则不生成
    *
    * @param sc
    * @param sqlContext
    */
  def mockData(sc: JavaSparkContext, sqlContext: SQLContext): Unit = {
    val local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)
    if (local) MockData.mock(sc, sqlContext)
  }

  /**
    * 获取指定日期范围内的用户行为数据RDD
    *
    * @param sqlContext
    * @param taskParam
    * @return
    */
  def getActionRDDByDateRange(sqlContext: SQLContext,task:TaskEntity): RDD[(String,Row)] = {
    val startDate = task.getStartTime
    val endDate =task.getFinishTime
    val sql = "select * " + "from user_visit_action " + "where date>='" + startDate + "' " + "and date<='" + endDate + "'"
    //				+ "and session_id not in('','','')"
    val actionDF = sqlContext.sql(sql)

    /**
      * 这里就很有可能发生上面说的问题
      * 比如说，Spark SQl默认就给第一个stage设置了20个task，但是根据你的数据量以及算法的复杂度
      * 实际上，你需要1000个task去并行执行
      *
      * 所以说，在这里，就可以对Spark SQL刚刚查询出来的RDD执行repartition重分区操作
      */
    //		return actionDF.javaRDD().repartition(1000);
    val rdd :RDD[(String,Row)] =actionDF.rdd.map(row=> (row(2).toString,row) )
    rdd
  }
}
